agentmux_srv\drone\executor\blocks/agent.rs
1// Copyright 2026, AgentMux Corp.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Agent block — one-shot Claude Code spawn driven by an `AgentRef`
5//! and a per-call task template.
6//!
7//! Phase 1.5 PR 2: replaces the original stub
8//! (`{ response: "[stub]" }`) with a real invocation of
9//! `agents::runner::run_agent`. The runner spawns
10//! `claude --print --output-format=stream-json`, drains its stdout
11//! through `ClaudeTranslator`, and produces a structured
12//! `AgentRunResult` that this function flattens into the
13//! snake_case drone-block output shape.
14//!
15//! Block config (`node.data`):
16//! * `task` — required. Mustache-style template resolved
17//! against `scope.outputs + scope.vars` before
18//! spawning.
19//! * `agent_ref` — optional. Object matching `AgentRef` shape
20//! (camelCase keys): identityId, memoryId,
21//! instanceName, workingDirectory. All fields
22//! optional; missing = blank agent.
23//! * `max_turns` — optional. Hard cap on claude turns.
24//!
25//! Output (snake_case to match other drone blocks — see spec
26//! §4.5):
27//!
28//! ```json
29//! {
30//! "response": "<text>",
31//! "tokens": { "input": .., "output": .., "cache_creation": .., "cache_read": .. },
32//! "cost_usd": 0.001,
33//! "status": "done"
34//! }
35//! ```
36//!
37//! Downstream blocks read `{{<this_block_id>.response}}` for the
38//! agent's reply and `{{<this_block_id>.cost_usd}}` for accounting.
39
40use serde_json::{json, Value};
41
42use crate::agents::runner::{run_agent, AgentError};
43use crate::agents::types::{AgentRef, AgentTask};
44use crate::drone::data_flow::ExecutionScope;
45use crate::drone::types::FlowNode;
46
47pub async fn run(node: &FlowNode, scope: &ExecutionScope) -> Result<Value, String> {
48 let task_raw = node
49 .data
50 .get("task")
51 .and_then(|v| v.as_str())
52 .ok_or_else(|| "agent block missing `task`".to_string())?;
53 let prompt = scope.resolve(task_raw);
54
55 let agent_ref: AgentRef = match node.data.get("agent_ref") {
56 Some(v) => serde_json::from_value(v.clone())
57 .map_err(|e| format!("agent block: invalid agent_ref: {e}"))?,
58 None => {
59 // Pre-Phase-1.5 nodes persisted a single `forge_agent_id`
60 // string; the runner can't honor it because identity and
61 // memory are now separate bundles. Surface the legacy data
62 // in the log so the user knows why the agent launches
63 // blank (#835).
64 if let Some(legacy) = node.data.get("forge_agent_id").and_then(|v| v.as_str()) {
65 if !legacy.is_empty() {
66 tracing::warn!(
67 block_id = %node.id,
68 legacy_forge_agent_id = %legacy,
69 "agent block: legacy `forge_agent_id` ignored — re-pick identity/memory after Phase 1.5 PR 3"
70 );
71 }
72 }
73 AgentRef::default()
74 }
75 };
76
77 let max_turns = node
78 .data
79 .get("max_turns")
80 .and_then(|v| v.as_u64())
81 .map(|n| n as u32);
82
83 let task = AgentTask {
84 prompt,
85 // The runner doesn't currently use the `context` map (claude
86 // takes the prompt on argv); leave it empty. Phase 2 may
87 // surface scope vars as a `system` message section.
88 context: serde_json::Map::new(),
89 max_turns,
90 };
91
92 // Forward AgentEvents into a local channel and discard for now.
93 // Phase 1.5 PR 3 will re-emit them on the `dronerun:<id>`
94 // broker so the inspector pane can render the live stream.
95 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
96 let handle = run_agent(agent_ref, task, tx).await.map_err(|e| match e {
97 AgentError::Spawn(msg) => format!("agent block: spawn failed: {msg}"),
98 AgentError::InvalidRef(msg) => format!("agent block: invalid agent ref: {msg}"),
99 })?;
100
101 // Drain the event channel concurrently so the runner's sender
102 // can make progress. Drop the events for now — the captured
103 // accumulator from `final_result` is the authoritative output.
104 tokio::spawn(async move {
105 while rx.recv().await.is_some() {}
106 });
107
108 let result = handle
109 .final_result
110 .await
111 .map_err(|e| format!("agent block: runner cancelled: {e}"))?
112 .map_err(|e| format!("agent block: agent run failed: {e}"))?;
113
114 // Manually flatten to snake_case to match other drone block
115 // outputs (the AgentRunResult's serde camelCase is for the IPC
116 // seam with the frontend, NOT for drone templates — see spec
117 // §4.5 NOTE).
118 Ok(json!({
119 "response": result.response,
120 "tokens": {
121 "input": result.tokens.input,
122 "output": result.tokens.output,
123 "cache_creation": result.tokens.cache_creation,
124 "cache_read": result.tokens.cache_read,
125 },
126 "cost_usd": result.cost_usd,
127 "status": "done",
128 }))
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use crate::drone::types::NodePosition;
135
136 fn mk_node(data: Value) -> FlowNode {
137 FlowNode {
138 id: "a1".to_string(),
139 position: NodePosition::default(),
140 data,
141 node_type: String::new(),
142 }
143 }
144
145 #[tokio::test]
146 async fn rejects_missing_task() {
147 let node = mk_node(json!({ "kind": "agent" }));
148 let scope = ExecutionScope::new();
149 let err = run(&node, &scope).await.expect_err("must error");
150 assert!(err.contains("missing `task`"), "got: {err}");
151 }
152
153 #[tokio::test]
154 async fn rejects_malformed_agent_ref() {
155 let node = mk_node(json!({
156 "kind": "agent",
157 "task": "hi",
158 "agent_ref": "not an object"
159 }));
160 let scope = ExecutionScope::new();
161 let err = run(&node, &scope).await.expect_err("must error");
162 assert!(err.contains("invalid agent_ref"), "got: {err}");
163 }
164
165 // The spawn-failure → "agent block: spawn failed: ..." mapping is
166 // covered by `agents::runner::tests::run_agent_with_bin_surfaces_spawn_failure`,
167 // which injects a nonexistent binary path via the internal
168 // `run_agent_with_bin` entry point instead of `std::env::set_var`
169 // (unsound under concurrent test execution in Rust 1.81+).
170 // Reagent P2 on PR #834.
171
172 /// Reproduces the parse-only path of `run()` for a node carrying
173 /// legacy `forge_agent_id` (no `agent_ref`). The runner shim isn't
174 /// invoked — the assertion is that we accept the node and fall back
175 /// to a default `AgentRef`. The deprecation warning fires as a side
176 /// effect; we keep this test free of `tracing` plumbing.
177 #[test]
178 fn legacy_forge_agent_id_falls_back_to_default_ref() {
179 let data = json!({
180 "kind": "agent",
181 "task": "hi",
182 "forge_agent_id": "legacy-id-123"
183 });
184 let agent_ref: AgentRef = match data.get("agent_ref") {
185 Some(v) => serde_json::from_value(v.clone()).unwrap(),
186 None => AgentRef::default(),
187 };
188 assert_eq!(agent_ref, AgentRef::default());
189 // Confirms the legacy field is still present in node.data — the
190 // production path reads it for the warn-log; tests don't.
191 assert_eq!(
192 data.get("forge_agent_id").and_then(|v| v.as_str()),
193 Some("legacy-id-123")
194 );
195 }
196}